/*

 * Licensed to the Apache Software Foundation (ASF) under one

 * or more contributor license agreements.  See the NOTICE file

 * distributed with this work for additional information

 * regarding copyright ownership.  The ASF licenses this file

 * to you under the Apache License, Version 2.0 (the

 * "License"); you may not use this file except in compliance

 * with the License.  You may obtain a copy of the License at

 *

 *     http://www.apache.org/licenses/LICENSE-2.0

 *

 * Unless required by applicable law or agreed to in writing, software

 * distributed under the License is distributed on an "AS IS" BASIS,

 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

 * See the License for the specific language governing permissions and

 * limitations under the License.

 */

package com.bff.gaia.unified.sdk.io.parquet;



import static com.bff.gaia.unified.vendor.guava.com.google.common.base.Preconditions.checkNotNull;

import static org.apache.parquet.hadoop.ParquetFileWriter.Mode.OVERWRITE;



import com.google.auto.value.AutoValue;

import java.io.IOException;

import java.io.OutputStream;

import java.nio.channels.Channels;

import java.nio.channels.SeekableByteChannel;

import java.nio.channels.WritableByteChannel;

import javax.annotation.Nullable;

import org.apache.avro.Schema;

import org.apache.avro.generic.GenericRecord;

import com.bff.gaia.unified.sdk.annotations.Experimental;

import com.bff.gaia.unified.sdk.coders.AvroCoder;

import com.bff.gaia.unified.sdk.coders.StringUtf8Coder;

import com.bff.gaia.unified.sdk.io.FileIO;

import com.bff.gaia.unified.sdk.io.fs.ResourceId;

import com.bff.gaia.unified.sdk.options.ValueProvider;

import com.bff.gaia.unified.sdk.transforms.Create;

import com.bff.gaia.unified.sdk.transforms.DoFn;

import com.bff.gaia.unified.sdk.transforms.PTransform;

import com.bff.gaia.unified.sdk.transforms.ParDo;

import com.bff.gaia.unified.sdk.transforms.display.DisplayData;

import com.bff.gaia.unified.sdk.values.PBegin;

import com.bff.gaia.unified.sdk.values.PCollection;

import org.apache.parquet.avro.AvroParquetReader;

import org.apache.parquet.avro.AvroParquetWriter;

import org.apache.parquet.hadoop.ParquetReader;

import org.apache.parquet.hadoop.ParquetWriter;

import org.apache.parquet.hadoop.metadata.CompressionCodecName;

import org.apache.parquet.io.DelegatingSeekableInputStream;

import org.apache.parquet.io.InputFile;

import org.apache.parquet.io.OutputFile;

import org.apache.parquet.io.PositionOutputStream;

import org.apache.parquet.io.SeekableInputStream;



/**

 * IO to read and write Parquet files.

 *

 * <h3>Reading Parquet files</h3>

 *

 * <p>{@link ParquetIO} source returns a {@link PCollection} for Parquet files. The elements in the

 * {@link PCollection} are Avro {@link GenericRecord}.

 *

 * <p>To configure the {@link Read}, you have to provide the file patterns (from) of the Parquet

 * files and the schema.

 *

 * <p>For example:

 *

 * <pre>{@code

 * PCollection<GenericRecord> records = pipeline.apply(ParquetIO.read(SCHEMA).from("/foo/bar"));

 * ...

 * }</pre>

 *

 * <p>As {@link Read} is based on {@link FileIO}, it supports any filesystem (hdfs, ...).

 *

 * <p>For more advanced use cases, like reading each file in a {@link PCollection} of {@link

 * FileIO.ReadableFile}, use the {@link ReadFiles} transform.

 *

 * <p>For example:

 *

 * <pre>{@code

 * PCollection<FileIO.ReadableFile> files = pipeline

 *   .apply(FileIO.match().filepattern(options.getInputFilepattern())

 *   .apply(FileIO.readMatches());

 *

 * PCollection<GenericRecord> output = files.apply(ParquetIO.readFiles(SCHEMA));

 * }</pre>

 *

 * <h3>Writing Parquet files</h3>

 *

 * <p>{@link ParquetIO.Sink} allows you to write a {@link PCollection} of {@link GenericRecord} into

 * a Parquet file. It can be used with the general-purpose {@link FileIO} transforms with

 * FileIO.write/writeDynamic specifically.

 *

 * <p>By default, {@link ParquetIO.Sink} produces output files that are compressed using the {@link

 * org.apache.parquet.format.CompressionCodec#SNAPPY}. This default can be changed or overridden

 * using {@link ParquetIO.Sink#withCompressionCodec(CompressionCodecName)}.

 *

 * <p>For example:

 *

 * <pre>{@code

 * pipeline

 *   .apply(...) // PCollection<GenericRecord>

 *   .apply(FileIO.<GenericRecord>

 *     .write()

 *     .via(ParquetIO.sink(SCHEMA)

 *       .withCompression(CompressionCodecName.SNAPPY))

 *     .to("destination/path")

 * }</pre>

 *

 * <p>This IO API is considered experimental and may break or receive backwards-incompatible changes

 * in future versions of the Apache Unified SDK.

 */

@Experimental(Experimental.Kind.SOURCE_SINK)

public class ParquetIO {



  /**

   * Reads {@link GenericRecord} from a Parquet file (or multiple Parquet files matching the

   * pattern).

   */

  public static Read read(Schema schema) {

    return new AutoValue_ParquetIO_Read.Builder().setSchema(schema).build();

  }



  /**

   * Like {@link #read(Schema)}, but reads each file in a {@link PCollection} of {@link

   * FileIO.ReadableFile}, which allows more flexible usage.

   */

  public static ReadFiles readFiles(Schema schema) {

    return new AutoValue_ParquetIO_ReadFiles.Builder().setSchema(schema).build();

  }



  /** Implementation of {@link #read(Schema)}. */

  @AutoValue

  public abstract static class Read extends PTransform<PBegin, PCollection<GenericRecord>> {



    @Nullable

    abstract ValueProvider<String> getFilepattern();



    @Nullable

    abstract Schema getSchema();



    abstract Builder toBuilder();



    @AutoValue.Builder

    abstract static class Builder {

      abstract Builder setFilepattern(ValueProvider<String> filepattern);



      abstract Builder setSchema(Schema schema);



      abstract Read build();

    }



    /** Reads from the given filename or filepattern. */

    public Read from(ValueProvider<String> filepattern) {

      return toBuilder().setFilepattern(filepattern).build();

    }



    /** Like {@link #from(ValueProvider)}. */

    public Read from(String filepattern) {

      return from(ValueProvider.StaticValueProvider.of(filepattern));

    }



    @Override

    public PCollection<GenericRecord> expand(PBegin input) {

      checkNotNull(getFilepattern(), "Filepattern cannot be null.");



      return input

          .apply("Create filepattern", Create.ofProvider(getFilepattern(), StringUtf8Coder.of()))

          .apply(FileIO.matchAll())

          .apply(FileIO.readMatches())

          .apply(readFiles(getSchema()));

    }



    @Override

    public void populateDisplayData(DisplayData.Builder builder) {

      super.populateDisplayData(builder);

      builder.add(

          DisplayData.item("filePattern", getFilepattern()).withLabel("Input File Pattern"));

    }

  }



  /** Implementation of {@link #readFiles(Schema)}. */

  @AutoValue

  public abstract static class ReadFiles

      extends PTransform<PCollection<FileIO.ReadableFile>, PCollection<GenericRecord>> {



    @Nullable

    abstract Schema getSchema();



    @AutoValue.Builder

    abstract static class Builder {

      abstract Builder setSchema(Schema schema);



      abstract ReadFiles build();

    }



    @Override

    public PCollection<GenericRecord> expand(PCollection<FileIO.ReadableFile> input) {

      checkNotNull(getSchema(), "Schema can not be null");

      return input.apply(ParDo.of(new ReadFn())).setCoder(AvroCoder.of(getSchema()));

    }



    static class ReadFn extends DoFn<FileIO.ReadableFile, GenericRecord> {



      @ProcessElement

      public void processElement(ProcessContext processContext) throws Exception {

        FileIO.ReadableFile file = processContext.element();



        if (!file.getMetadata().isReadSeekEfficient()) {

          ResourceId filename = file.getMetadata().resourceId();

          throw new RuntimeException(String.format("File has to be seekable: %s", filename));

        }



        SeekableByteChannel seekableByteChannel = file.openSeekable();



        try (ParquetReader<GenericRecord> reader =

            AvroParquetReader.<GenericRecord>builder(new UnifiedParquetInputFile(seekableByteChannel))

                .build()) {

          GenericRecord read;

          while ((read = reader.read()) != null) {

            processContext.output(read);

          }

        }

      }

    }



    private static class UnifiedParquetInputFile implements InputFile {



      private SeekableByteChannel seekableByteChannel;



      UnifiedParquetInputFile(SeekableByteChannel seekableByteChannel) {

        this.seekableByteChannel = seekableByteChannel;

      }



      @Override

      public long getLength() throws IOException {

        return seekableByteChannel.size();

      }



      @Override

      public SeekableInputStream newStream() {

        return new DelegatingSeekableInputStream(Channels.newInputStream(seekableByteChannel)) {



          @Override

          public long getPos() throws IOException {

            return seekableByteChannel.position();

          }



          @Override

          public void seek(long newPos) throws IOException {

            seekableByteChannel.position(newPos);

          }

        };

      }

    }

  }



  /** Creates a {@link Sink} that, for use with {@link FileIO#write}. */

  public static Sink sink(Schema schema) {

    return new AutoValue_ParquetIO_Sink.Builder()

        .setJsonSchema(schema.toString())

        .setCompressionCodec(CompressionCodecName.SNAPPY)

        .build();

  }



  /** Implementation of {@link #sink}. */

  @AutoValue

  public abstract static class Sink implements FileIO.Sink<GenericRecord> {



    @Nullable

    abstract String getJsonSchema();



    abstract CompressionCodecName getCompressionCodec();



    abstract Builder toBuilder();



    @AutoValue.Builder

    abstract static class Builder {

      abstract Builder setJsonSchema(String jsonSchema);



      abstract Builder setCompressionCodec(CompressionCodecName compressionCodec);



      abstract Sink build();

    }



    /** Specifies compression codec. By default, CompressionCodecName.SNAPPY. */

    public Sink withCompressionCodec(CompressionCodecName compressionCodecName) {

      return toBuilder().setCompressionCodec(compressionCodecName).build();

    }



    @Nullable

	private transient ParquetWriter<GenericRecord> writer;



    @Override

    public void open(WritableByteChannel channel) throws IOException {

      checkNotNull(getJsonSchema(), "Schema cannot be null");



      Schema schema = new Schema.Parser().parse(getJsonSchema());



      UnifiedParquetOutputFile unifiedParquetOutputFile =

          new UnifiedParquetOutputFile(Channels.newOutputStream(channel));



      this.writer =

          AvroParquetWriter.<GenericRecord>builder(unifiedParquetOutputFile)

              .withSchema(schema)

              .withCompressionCodec(getCompressionCodec())

              .withWriteMode(OVERWRITE)

              .build();

    }



    @Override

    public void write(GenericRecord element) throws IOException {

      checkNotNull(writer, "Writer cannot be null");

      writer.write(element);

    }



    @Override

    public void flush() throws IOException {

      writer.close();

    }



    private static class UnifiedParquetOutputFile implements OutputFile {



      private OutputStream outputStream;



      UnifiedParquetOutputFile(OutputStream outputStream) {

        this.outputStream = outputStream;

      }



      @Override

      public PositionOutputStream create(long blockSizeHint) {

        return new UnifiedOutputStream(outputStream);

      }



      @Override

      public PositionOutputStream createOrOverwrite(long blockSizeHint) {

        return new UnifiedOutputStream(outputStream);

      }



      @Override

      public boolean supportsBlockSize() {

        return false;

      }



      @Override

      public long defaultBlockSize() {

        return 0;

      }

    }



    private static class UnifiedOutputStream extends PositionOutputStream {

      private long position = 0;

      private OutputStream outputStream;



      private UnifiedOutputStream(OutputStream outputStream) {

        this.outputStream = outputStream;

      }



      @Override

      public long getPos() throws IOException {

        return position;

      }



      @Override

      public void write(int b) throws IOException {

        position++;

        outputStream.write(b);

      }



      @Override

      public void write(byte[] b) throws IOException {

        write(b, 0, b.length);

      }



      @Override

      public void write(byte[] b, int off, int len) throws IOException {

        outputStream.write(b, off, len);

        position += len;

      }



      @Override

      public void flush() throws IOException {

        outputStream.flush();

      }



      @Override

      public void close() throws IOException {

        outputStream.close();

      }

    }

  }



  /** Disallow construction of utility class. */

  private ParquetIO() {}

}